/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.services.bigquery.model.EncryptionConfiguration;

import com.google.api.services.bigquery.model.JobConfigurationTableCopy;

import com.google.api.services.bigquery.model.JobReference;

import com.google.api.services.bigquery.model.TableReference;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ArrayListMultimap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Multimap;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.util.Collection;

import java.util.List;

import java.util.Map;

import java.util.stream.Collectors;

import java.util.stream.StreamSupport;



/**

 * Copies temporary tables to destination table. The input element is an {@link Iterable} that

 * provides the list of all temporary tables created for a given {@link TableDestination}.

 */

class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {

  private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);



  private final BigQueryServices bqServices;

  private final PCollectionView<String> jobIdToken;



  // In the triggered scenario, the user-supplied create and write dispositions only apply to the

  // first trigger pane, as that's when when the table is created. Subsequent loads should always

  // append to the table, and so use CREATE_NEVER and WRITE_APPEND dispositions respectively.

  private final BigQueryIO.Write.WriteDisposition firstPaneWriteDisposition;

  private final BigQueryIO.Write.CreateDisposition firstPaneCreateDisposition;

  private final int maxRetryJobs;

  private final String kmsKey;



  private static class PendingJobData {

    final BigQueryHelpers.PendingJob retryJob;

    final TableDestination tableDestination;

    final List<TableReference> tempTables;



    public PendingJobData(

        BigQueryHelpers.PendingJob retryJob,

        TableDestination tableDestination,

        List<TableReference> tempTables) {

      this.retryJob = retryJob;

      this.tableDestination = tableDestination;

      this.tempTables = tempTables;

    }

  }

  // All pending copy jobs.

  private List<PendingJobData> pendingJobs = Lists.newArrayList();



  public WriteRename(

      BigQueryServices bqServices,

      PCollectionView<String> jobIdToken,

      BigQueryIO.Write.WriteDisposition writeDisposition,

      BigQueryIO.Write.CreateDisposition createDisposition,

      int maxRetryJobs,

      String kmsKey) {

    this.bqServices = bqServices;

    this.jobIdToken = jobIdToken;

    this.firstPaneWriteDisposition = writeDisposition;

    this.firstPaneCreateDisposition = createDisposition;

    this.maxRetryJobs = maxRetryJobs;

    this.kmsKey = kmsKey;

  }



  @StartBundle

  public void startBundle(StartBundleContext c) {

    pendingJobs.clear();

  }



  @ProcessElement

  public void processElement(ProcessContext c) throws Exception {

    Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();

    for (KV<TableDestination, String> entry : c.element()) {

      tempTables.put(entry.getKey(), entry.getValue());

    }

    for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {

      // Process each destination table.

      // Do not copy if no temp tables are provided.

      if (!entry.getValue().isEmpty()) {

        pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c));

      }

    }

  }



  @FinishBundle

  public void finishBundle(FinishBundleContext c) throws Exception {

    BigQueryServices.DatasetService datasetService =

        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));

    BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager();

    for (PendingJobData pendingJob : pendingJobs) {

      jobManager.addPendingJob(

          pendingJob.retryJob,

          j -> {

            try {

              if (pendingJob.tableDestination.getTableDescription() != null) {

                TableReference ref = pendingJob.tableDestination.getTableReference();

                datasetService.patchTableDescription(

                    ref.clone()

                        .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),

                    pendingJob.tableDestination.getTableDescription());

              }

              removeTemporaryTables(datasetService, pendingJob.tempTables);

              return null;

            } catch (IOException | InterruptedException e) {

              return e;

            }

          });

    }

    jobManager.waitForDone();

  }



  private PendingJobData startWriteRename(

      TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)

      throws Exception {

    BigQueryIO.Write.WriteDisposition writeDisposition =

        (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : BigQueryIO.Write.WriteDisposition.WRITE_APPEND;

    BigQueryIO.Write.CreateDisposition createDisposition =

        (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : BigQueryIO.Write.CreateDisposition.CREATE_NEVER;

    List<TableReference> tempTables =

        StreamSupport.stream(tempTableNames.spliterator(), false)

            .map(table -> BigQueryHelpers.fromJsonString(table, TableReference.class))

            .collect(Collectors.toList());

    ;



    // Make sure each destination table gets a unique job id.

    String jobIdPrefix =

        BigQueryHelpers.createJobId(

            c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex());



    BigQueryHelpers.PendingJob retryJob =

        startCopy(

            bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),

            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),

            jobIdPrefix,

            finalTableDestination.getTableReference(),

            tempTables,

            writeDisposition,

            createDisposition,

            kmsKey);

    return new PendingJobData(retryJob, finalTableDestination, tempTables);

  }



  private BigQueryHelpers.PendingJob startCopy(

      BigQueryServices.JobService jobService,

      BigQueryServices.DatasetService datasetService,

      String jobIdPrefix,

      TableReference ref,

      List<TableReference> tempTables,

      BigQueryIO.Write.WriteDisposition writeDisposition,

      BigQueryIO.Write.CreateDisposition createDisposition,

      String kmsKey) {

    JobConfigurationTableCopy copyConfig =

        new JobConfigurationTableCopy()

            .setSourceTables(tempTables)

            .setDestinationTable(ref)

            .setWriteDisposition(writeDisposition.name())

            .setCreateDisposition(createDisposition.name());

    if (kmsKey != null) {

      copyConfig.setDestinationEncryptionConfiguration(

          new EncryptionConfiguration().setKmsKeyName(kmsKey));

    }



    String bqLocation =

        BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId());



    String projectId = ref.getProjectId();

    BigQueryHelpers.PendingJob retryJob =

        new BigQueryHelpers.PendingJob(

            jobId -> {

              JobReference jobRef =

                  new JobReference()

                      .setProjectId(projectId)

                      .setJobId(jobId.getJobId())

                      .setLocation(bqLocation);

              LOG.info(

                  "Starting copy job for table {} using  {}, job id iteration {}",

                  ref,

                  jobRef,

                  jobId.getRetryIndex());

              try {

                jobService.startCopyJob(jobRef, copyConfig);

              } catch (IOException | InterruptedException e) {

                LOG.warn("Copy job {} failed with {}", jobRef, e);

                throw new RuntimeException(e);

              }

              return null;

            },

            // Function to poll the result of a load job.

            jobId -> {

              JobReference jobRef =

                  new JobReference()

                      .setProjectId(projectId)

                      .setJobId(jobId.getJobId())

                      .setLocation(bqLocation);

              try {

                return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);

              } catch (InterruptedException e) {

                throw new RuntimeException(e);

              }

            },

            // Function to lookup a job.

            jobId -> {

              JobReference jobRef =

                  new JobReference()

                      .setProjectId(projectId)

                      .setJobId(jobId.getJobId())

                      .setLocation(bqLocation);

              try {

                return jobService.getJob(jobRef);

              } catch (InterruptedException | IOException e) {

                throw new RuntimeException(e);

              }

            },

            maxRetryJobs,

            jobIdPrefix);

    return retryJob;

  }



  static void removeTemporaryTables(BigQueryServices.DatasetService tableService, List<TableReference> tempTables) {

    for (TableReference tableRef : tempTables) {

      try {

        LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));

        tableService.deleteTable(tableRef);

      } catch (Exception e) {

        LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e);

      }

    }

  }



  @Override

  public void populateDisplayData(DisplayData.Builder builder) {

    super.populateDisplayData(builder);



    builder

        .add(

            DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString())

                .withLabel("Write Disposition"))

        .add(

            DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString())

                .withLabel("Create Disposition"));

  }

}